Redshift に認証失敗したリモートホストを検出して通知したい

Redshift に認証失敗したリモートホストを検出して通知したい

Redshift のセキュリティ監視で需要があれば。
Clock Icon2020.05.15

この記事は公開されてから1年以上経過しています。情報が古い可能性がありますので、ご注意ください。

今回は Redshift に一定回数、認証失敗した接続元を検出し、通知する仕組みを実装する機会があったので紹介します。

検出条件は 「30 分間に 10 回以上、認証に失敗した通信元の検出」とします。

Redshift のログ

Redshift には 3 つのログオプションがあります。

  • 監査ログ
  • STL テーブル
  • AWS CloudTrail

今回のようにログインなどのアクティビティを調査する場合、「監査ログ」か「STL テーブル」を使用することになるかと思います。

それぞれの違いについては、以下 AWS 公式の表が解りやすくまとまっているので参照ください。

今回は直近 30 分のログを調査できれば良いので STL テーブルを利用することにしました。

長期間にわたるログであったり、負荷が高くなるようなクエリであれば Athena を使って監査ログから調べるなど、用途に応じて「監査ログ」「STL テーブル」を使い分けていただければ良いかと思います。

構成

今回の構成は以下のようにしています。

パブリック接続可能な Redshift ですが IP 制限を行っているため、VPC Lambda としています。SNS エンドポイントには NAT Gateway を抜けてアクセスします。あとは 30 分ごとに実行するために CloudWatch Events を利用します。

ネットワークまわりは各環境にあわせて変更していただければ良いかと思います。

実装内容

事前準備

以下の内容は本筋からそれるので割愛します。

  • NAT Gateway の設置
  • SNS トピックおよびサブスクリプションの設定
  • CloudWatch Events の設定
  • パラメータストアの設定
    • 構成図に描き忘れていましたが、ユーザID、パスワードは AWS Systems Manager のパラメータストアに格納しています

Lambda Layer の作成

今回は Python(boto3) で実装していますが、pandaspsycopg2 のモジュールを使っています。これらを Lambda Layer に登録しておきます。

pandas

pip コマンドでローカル環境にインストールした pandas を zip 化して Lambda Layer に登録します。

$ mkdir python

$ pip install -t ./python pandas
$ zip -r pandas.zip python

psycopg2

psycopg2 モジュールは pandas のように pip install で作成しても、動的リンク先の libpq が Lambda の実行環境に含まれていないため、以下のようなエラーになり動作しません。

Unable to import module 'lambda_function': libpq.so.5: cannot open shared object file: No such file or directory

詳細は irbbb の記事を参照ください。

記事のように自分でコンパイルしても良いのですが、今回はGithub で公開されていたコチラのコンパイル済みのパッケージを利用させていただきました。今回は Python3.7 で準備していたのでリンク先より psycopg2-3.7 ディレクトリを取得します。

/python ディレクトリ化に展開した awslambda-psycopg2 ディレクトリを psycopg2 にリネームして zip 化します。

$ tree
.
└── python
    └── psycopg2
        ├── __init__.py
        ├── _ipaddress.py
        ├── _json.py
        ├── _lru_cache.py
        ├── _psycopg.cpython-37m-x86_64-linux-gnu.so
        ├── _range.py
        ├── compat.py
        ├── errorcodes.py
        ├── errors.py
        ├── extensions.py
        ├── extras.py
        ├── pool.py
        ├── psycopg1.py
        ├── sql.py
        └── tz.py
$ zip -r psycopg2.zip python

zip 化した 2 つのモジュールを Lambda Layer に登録します。

Lambda 関数の作成

Lambda 関数は以下のように作成しました。ロールは検証のため AdministratorAccess ポリシーをアタッチしていますが、必要に応じて権限は絞り込んでください。

項目
ランタイム Python 3.7
ロール AdministratorAccess ポリシーをアタッチ
VPC Custom VPC
Lambda Layer pandas と psycopg2 を利用

環境変数

幾つかの環境変数を設定しています。

環境変数名 説明
dbname 接続するデータベース名を指定
endpoint Redshift クラスターのエンドポイントを指定
port Redshift クラスターの接続ポートを指定
user_ps ユーザ名用のパラメータストア名を指定
password_ps パスワード用のパラメータストア名を指定
threshold しきい値を指定(今回は 10 にしました)
sns_topic_arn 通知先の SNS トピック ARN を指定
env メールの Subject に含める任意の環境名を指定

コード

本題のコードは以下のとおりです。

import boto3
import os
import logging
import psycopg2
import pandas as pd

logger = logging.getLogger()
logger.setLevel(logging.INFO)

endpoint = os.environ['endpoint']
port = os.environ['port']
dbname = os.environ['dbname']
user_ps = os.environ['user_ps']
password_ps = os.environ['password_ps']
ssm = boto3.client('ssm')
sns = boto3.client('sns')

def lambda_handler(event, context):
    # SQL 文
    # 実行時点〜30分前の間の authentication failure になった通信元をカウント
    sql = """
    SELECT remotehost, count(*) FROM stl_connection_log
    WHERE event = 'authentication failure'
    AND recordtime BETWEEN (SYSDATE + INTERVAL '-30 minute') and SYSDATE
    GROUP BY remotehost;
    """
    try:
        df = sql_query(sql)
    except Exception as e:
        logger.error(e)
        raise e
    
    msg = []
    
    for index, row in df.iterrows():
        threshold = os.environ['threshold']
        if row['count'] >= int(threshold):
            logging.warning('remote host %s was exceeded threshold of authentication failure [%i times]' % (row['remotehost'], row['count']))
            m = ('%s [%i times]' % (row['remotehost'], row['count']))
            msg.append(m)

    if msg:
        sns_send_message(msg)    

# 接続情報
def get_connection():
    # ユーザ名の取得
    user = ssm.get_parameter(
        Name = user_ps,
        WithDecryption = True
        )['Parameter']['Value']
    # パスワードの取得
    password = ssm.get_parameter(
        Name = password_ps,
        WithDecryption = True
        )['Parameter']['Value']
        
    dsn = {
        "host" : endpoint,
        "port" : port,
        "database" : dbname,
        "user" : user,
        "password" : password,
    }
    
    con = psycopg2.connect(**dsn)
    return con
    
# SQL の実行
def sql_query(sql):
    with get_connection() as conn:
        return pd.read_sql(sql=sql, con=conn)

# SNS トピックに送信
def sns_send_message(msg):
        env = os.environ['env']
        topic = os.environ['sns_topic_arn']
        sendmsg = sorted(msg)
        sendmsg.insert(0, 'The following IP address was exceeded threshold of authentication failure.\nPlease check stl_connection_log\n')
        subject = ('[ALERT]:(%s) Redshift Cluster authentication failure detection.' % env)
    
        try:
            response = sns.publish(
                TopicArn = topic,
                Message = "\n".join(sendmsg),
                Subject = subject
            )
        except Exception as e:
            logger.error(e)
            raise e

今回は通信元の remotehost フィールドのみで集計していますが、要件によってデータベース名やユーザ名など複数フィールドでグループ化してください。

psycopg2 を使って pandas.read_sql() で読み込む方法については、以下の記事を参考にしましたのであわせてお読みください。

確認

上記のとおり Lambda が準備できたら、試しに該当の Redshift クラスターに対してログイン失敗を 10 回ほど行った後、Lambda のテストを実行すると以下のように接続失敗した通信元の IP アドレスと 30 分間に失敗したカウント数が通知されます。

あとは、この Lambda を CloudWatch Event で 30 分ごとに実行するように仕込めば完成です。

さいごに

今回は Redshift の STL テーブルを使って、ログイン失敗のしきい値を超えた通信元を通知する仕組みを紹介しました。

Redshift のセキュリティ監視要件等で必要があれば参考にしていただければ幸いです。

以上!大阪オフィスの丸毛(@marumo1981)でした!

Share this article

facebook logohatena logotwitter logo

© Classmethod, Inc. All rights reserved.